/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.api.service.impl;

import com.chinamobile.cmss.lakehouse.api.dto.DataSourceBean;
import com.chinamobile.cmss.lakehouse.api.service.DataSourceService;
import com.chinamobile.cmss.lakehouse.api.service.LinkService;
import com.chinamobile.cmss.lakehouse.api.service.LinkServiceSelector;
import com.chinamobile.cmss.lakehouse.common.Constants;
import com.chinamobile.cmss.lakehouse.common.enums.Status;
import com.chinamobile.cmss.lakehouse.common.utils.BeanUtils;
import com.chinamobile.cmss.lakehouse.common.utils.ParameterUtils;
import com.chinamobile.cmss.lakehouse.common.utils.Result;
import com.chinamobile.cmss.lakehouse.dao.DataSourceDao;
import com.chinamobile.cmss.lakehouse.dao.entity.DataSourceEntity;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import javax.annotation.Resource;
import javax.persistence.criteria.Predicate;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
public class DataSourceServiceImpl extends BaseServiceImpl implements DataSourceService {

    @Resource
    private DataSourceDao dataSourceDao;

    @Resource
    private LinkServiceSelector linkServiceSelector;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Map<String, Object> create(String userId, DataSourceBean dataSourceBean) {
        Map<String, Object> result = new HashMap<>();

        Optional<DataSourceEntity> sameNameEntity = dataSourceDao.findByNameAndCreateUserId(dataSourceBean.getName(), userId);
        if (sameNameEntity.isPresent()) {
            putMessage(result, Status.DATASOURCE_NAME_EXIST);
            return result;
        }

        LinkService linkService = linkServiceSelector.getLinkService(dataSourceBean.getDataSourceType());
        linkService.validate(dataSourceBean);

        Date now = new Date();
        dataSourceBean.setCreateUserId(userId);
        dataSourceBean.setUpdateUserId(userId);
        dataSourceBean.setCreateTime(now);
        dataSourceBean.setUpdateTime(now);
        dataSourceBean.setUrl(linkService.appendUrl(dataSourceBean));
        DataSourceEntity dataSourceEntity = BeanUtils.copyFrom(dataSourceBean, DataSourceEntity.class);

        dataSourceEntity = dataSourceDao.save(dataSourceEntity);
        result.put(Constants.DATA_LIST, dataSourceEntity);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> get(String userId, Long id) {
        Map<String, Object> result = new HashMap<>();

        Optional<DataSourceEntity> dataSourceEntity = dataSourceDao.findByIdAndCreateUserId(id, userId);
        if (!dataSourceEntity.isPresent()) {
            putMessage(result, Status.DATASOURCE_NOT_EXIST);
            return result;
        }

        result.put(Constants.DATA_LIST, dataSourceEntity);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Map<String, Object> update(String userId, Long id, DataSourceBean dataSourceBean) {
        Map<String, Object> result = new HashMap<>();

        DataSourceEntity existEntity = dataSourceDao.getById(Long.valueOf(id));
        if (existEntity == null) {
            putMessage(result, Status.DATASOURCE_NOT_EXIST);
            return result;
        }

        if (!StringUtils.equals(dataSourceBean.getName(), existEntity.getName())) {
            Optional<DataSourceEntity> sameNameEntity = dataSourceDao.findByNameAndCreateUserId(dataSourceBean.getName(), userId);
            if (sameNameEntity.isPresent() && (!sameNameEntity.get().getId().equals(existEntity.getId()))) {
                putMessage(result, Status.DATASOURCE_NAME_EXIST);
                return result;
            }
        }

        LinkService linkService = linkServiceSelector.getLinkService(dataSourceBean.getDataSourceType());
        linkService.validate(dataSourceBean);

        Date now = new Date();
        dataSourceBean.setId(existEntity.getId());
        BeanUtils.copyProperties(dataSourceBean, existEntity);
        existEntity.setUpdateUserId(userId);
        existEntity.setUpdateTime(now);

        existEntity = dataSourceDao.save(existEntity);
        result.put(Constants.DATA_LIST, existEntity);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Map<String, Object> delete(String userId, Long id) {
        Map<String, Object> result = new HashMap<>();

        DataSourceEntity existEntity = dataSourceDao.getById(id);
        if (existEntity == null) {
            putMessage(result, Status.DATASOURCE_NOT_EXIST);
            return result;
        }

        dataSourceDao.delete(existEntity);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> testConnection(String userId, DataSourceBean dataSourceBean) {
        Map<String, Object> result = new HashMap<>();
        Optional<DataSourceEntity> existEntity = Optional.empty();

        if (null != dataSourceBean.getId()) {
            existEntity = dataSourceDao.findByIdAndCreateUserId(dataSourceBean.getId(), userId);
        }
        existEntity = Optional.ofNullable(existEntity.orElse(BeanUtils.copyFrom(dataSourceBean, DataSourceEntity.class)));

        try {
            LinkService linkService = linkServiceSelector.getLinkService(dataSourceBean.getDataSourceType());
            BeanUtils.copyProperties(existEntity.get(), dataSourceBean);
            existEntity.get().setUrl(linkService.appendUrl(dataSourceBean));

            boolean connected = linkService.isConnected(existEntity.get());
            result.put(Constants.DATA_LIST, connected);
            putMessage(result, connected ? Status.SUCCESS : Status.DATASOURCE_TEST_FAILED);
        } catch (RuntimeException rex) {
            log.error("connect to the datasource failure. " + rex.getMessage(), rex);
            result.put(Constants.DATA_LIST, Boolean.FALSE);
            putMessage(result, Status.DATASOURCE_TEST_FAILED);
        }

        return result;
    }

    @Override
    public Result list(String userId, String searchVal, Integer pageNo, Integer pageSize) {
        Result<Object> result = new Result<>();

        PageRequest pageRequest = PageRequest.of(pageNo - 1, pageSize, Sort.Direction.DESC, "id");
        Specification<DataSourceEntity> specification = buildSpecification(userId, searchVal);
        Page<DataSourceEntity> pageInfo = dataSourceDao.findAll(specification, pageRequest);
        result.setData(pageInfo);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    private Specification<DataSourceEntity> buildSpecification(String userId, String searchVal) {
        return (root, criteriaQuery, cBuilder) -> {
            //define a predicate
            Predicate p = cBuilder.and(cBuilder.equal(root.get("createUserId"), userId));
            if (StringUtils.isNotBlank(searchVal)) {
                p = cBuilder.and(p, cBuilder.like(root.get("name"), "%" + ParameterUtils.replaceSpecialChars(searchVal) + "%"));
            }
            return p;
        };
    }
}
